1 /***
2 * Redistribution and use of this software and associated documentation
3 * ("Software"), with or without modification, are permitted provided
4 * that the following conditions are met:
5 *
6 * 1. Redistributions of source code must retain copyright
7 * statements and notices. Redistributions must also contain a
8 * copy of this document.
9 *
10 * 2. Redistributions in binary form must reproduce the
11 * above copyright notice, this list of conditions and the
12 * following disclaimer in the documentation and/or other
13 * materials provided with the distribution.
14 *
15 * 3. The name "Exolab" must not be used to endorse or promote
16 * products derived from this Software without prior written
17 * permission of Exoffice Technologies. For written permission,
18 * please contact info@exolab.org.
19 *
20 * 4. Products derived from this Software may not be called "Exolab"
21 * nor may "Exolab" appear in their names without prior written
22 * permission of Exoffice Technologies. Exolab is a registered
23 * trademark of Exoffice Technologies.
24 *
25 * 5. Due credit should be given to the Exolab Project
26 * (http://www.exolab.org/).
27 *
28 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39 * OF THE POSSIBILITY OF SUCH DAMAGE.
40 *
41 * Copyright 2001-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42 *
43 * $Id: TopicDestinationCache.java,v 1.6 2005/12/20 20:39:45 tanderson Exp $
44 */
45 package org.exolab.jms.messagemgr;
46
47 import java.sql.Connection;
48 import java.util.ArrayList;
49 import java.util.Iterator;
50 import java.util.List;
51 import java.util.Vector;
52 import javax.jms.JMSException;
53
54 import org.exolab.jms.client.JmsDestination;
55 import org.exolab.jms.client.JmsTopic;
56 import org.exolab.jms.lease.LeaseManager;
57 import org.exolab.jms.message.MessageImpl;
58 import org.exolab.jms.persistence.DatabaseService;
59 import org.exolab.jms.persistence.PersistenceException;
60
61
62 /***
63 * A {@link DestinationCache} for topics.
64 *
65 * @author <a href="mailto:jima@comware.com.au">Jim Alateras</a>
66 * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
67 * @version $Revision: 1.6 $ $Date: 2005/12/20 20:39:45 $
68 */
69 class TopicDestinationCache extends AbstractDestinationCache {
70
71 /***
72 * Construct a new <code>TopicDestinationCache</code>.
73 *
74 * @param topic the topic to cache messages for
75 * @param database the database service
76 * @param leases the lease manager
77 */
78 public TopicDestinationCache(JmsTopic topic, DatabaseService database,
79 LeaseManager leases) {
80 super(topic, database, leases);
81 }
82
83 /***
84 * Register a consumer with this cache.
85 *
86 * @param consumer the message consumer for this destination
87 * @return <code>true</code> if registered; otherwise <code>false</code>
88 */
89 public boolean addConsumer(ConsumerEndpoint consumer) {
90
91 boolean result = false;
92
93
94
95 JmsTopic cdest = (JmsTopic) consumer.getDestination();
96 JmsTopic ddest = (JmsTopic) getDestination();
97
98 if (cdest.match(ddest)) {
99 result = super.addConsumer(consumer);
100 }
101
102 return result;
103 }
104
105 /***
106 * Invoked when the {@link MessageMgr} receives a non-persistent message.
107 *
108 * @param destination the message's destination
109 * @param message the message
110 * @throws JMSException if the listener fails to handle the message
111 */
112 public void messageAdded(JmsDestination destination, MessageImpl message)
113 throws JMSException {
114 boolean processed = false;
115 MessageRef reference =
116 new CachedMessageRef(message, false, getMessageCache());
117
118 reference.reference();
119
120
121
122 addMessage(reference, message);
123 MessageHandle handle = new SharedMessageHandle(this, reference,
124 message);
125
126 ConsumerEndpoint[] consumers = getConsumerArray();
127 for (int index = 0; index < consumers.length; index++) {
128 ConsumerEndpoint consumer = consumers[index];
129 processed |= consumer.messageAdded(handle, message);
130 }
131
132
133
134 if (processed) {
135 checkMessageExpiry(reference, message);
136 reference.dereference();
137 } else {
138
139 reference.destroy();
140
141
142 }
143 }
144
145 /***
146 * Invoked when the {@link MessageMgr} receives a persistent message.
147 *
148 * @param destination the message's destination
149 * @param message the message
150 * @throws JMSException if the listener fails to handle the message
151 * @throws PersistenceException if there is a persistence related problem
152 */
153 public void persistentMessageAdded(JmsDestination destination,
154 MessageImpl message)
155 throws JMSException, PersistenceException {
156 boolean processed = false;
157 MessageRef reference = new CachedMessageRef(message, true,
158 getMessageCache());
159 reference.reference();
160
161
162
163 addMessage(reference, message);
164 SharedMessageHandle handle = new SharedMessageHandle(this, reference,
165 message);
166
167
168 ConsumerEndpoint[] consumers = getConsumerArray();
169 for (int index = 0; index < consumers.length; index++) {
170 ConsumerEndpoint consumer = consumers[index];
171 processed |= consumer.persistentMessageAdded(handle, message);
172 }
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195 if (processed) {
196 checkMessageExpiry(reference, message);
197 reference.dereference();
198 } else {
199
200 handle.destroy();
201
202
203
204 }
205
206 }
207
208 /***
209 * Return a message handle back to the cache, to recover unsent or
210 * unacknowledged messages.
211 *
212 * @param handle the message handle to return
213 */
214 public void returnMessageHandle(MessageHandle handle) {
215 long consumerId = handle.getConsumerId();
216 AbstractTopicConsumerEndpoint endpoint =
217 (AbstractTopicConsumerEndpoint) getConsumerEndpoint(consumerId);
218
219
220 if (endpoint != null) {
221 endpoint.returnMessage(handle);
222 } else {
223
224 }
225 }
226
227 /***
228 * Load the state of a durable consumer.
229 *
230 * @param name the durable subscription name
231 * @return a list of {@link MessageHandle} instances
232 * @throws JMSException for any JMS error
233 */
234 public List getDurableMessageHandles(String name)
235 throws JMSException, PersistenceException {
236 DatabaseService service = DatabaseService.getInstance();
237 Connection connection = service.getConnection();
238 Vector handles = service.getAdapter().getMessageHandles(
239 connection, getDestination(), name);
240 List result = new ArrayList(handles.size());
241
242 MessageCache cache = getMessageCache();
243
244 Iterator iterator = handles.iterator();
245 while (iterator.hasNext()) {
246 PersistentMessageHandle handle =
247 (PersistentMessageHandle) iterator.next();
248 String messageId = handle.getMessageId();
249 MessageRef reference = cache.getMessageRef(messageId);
250 if (reference == null) {
251 reference = new CachedMessageRef(messageId, true, cache);
252 }
253 cache.addMessageRef(reference);
254 handle.reference(reference);
255 handle.setDestinationCache(this);
256 result.add(handle);
257
258 checkMessageExpiry(reference, handle.getExpiryTime());
259 }
260 return result;
261 }
262
263 /***
264 * Remove an expired persistent message, and notify any listeners.
265 *
266 * @param reference a handle to the expired message
267 * @throws JMSException if a listener fails to handle the
268 * expiration
269 * @throws PersistenceException if there is a persistence related problem
270 */
271 protected void persistentMessageExpired(MessageRef reference)
272 throws JMSException, PersistenceException {
273 String messageId = reference.getMessageId();
274 ConsumerEndpoint[] consumers = getConsumerArray();
275
276 for (int i = 0; i < consumers.length; ++i) {
277 consumers[i].persistentMessageRemoved(messageId);
278 }
279 }
280
281 }
282